package com.cube.share.base.checker;

import org.springframework.util.Assert;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author poker.li
 * @date 2021/8/17 14:40
 * <p>
 * 基于内存实现的重复消息检测器
 */
public class InMemoryDuplicateChecker<K extends Serializable> extends AbstractDuplicateMessageChecker<K> {

    /**
     * 清除周期 秒
     */
    private final long clearPeriod;

    /**
     * key -> msgId,value -> 过期时间
     */
    private final ConcurrentHashMap<K, Long> msgIdMap = new ConcurrentHashMap<>(64);

    private final static AtomicInteger THREAD_COUNT = new AtomicInteger(0);

    /**
     * 后台检查线程是否启动
     */
    private final AtomicBoolean checkingProcessStarted = new AtomicBoolean(false);

    public InMemoryDuplicateChecker(long ttl, long clearPeriod) {
        super(ttl);
        Assert.state(clearPeriod > 0, "clearPeriod must greater than 0");
        this.clearPeriod = clearPeriod;
    }

    private void checkingProcess() {
        if (checkingProcessStarted.getAndSet(true)) {
            return;
        }

        Thread t = new Thread(() -> {
            try {
                while (true) {
                    Thread.sleep(InMemoryDuplicateChecker.this.clearPeriod);
                    Long now = System.currentTimeMillis();
                    for (Map.Entry<K, Long> entry : InMemoryDuplicateChecker.this.msgIdMap.entrySet()) {
                        if (now - entry.getValue() > InMemoryDuplicateChecker.this.ttl * 1000) {
                            InMemoryDuplicateChecker.this.msgIdMap.entrySet().remove(entry);
                        }
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        //t.setDaemon(true);
        t.setName("InMemoryDuplicateChecker-thread-" + THREAD_COUNT.getAndIncrement());
        t.start();
    }

    @Override
    public boolean isDuplicate(Serializable messageId) {
        if (messageId == null) {
            return false;
        }
        checkingProcess();
        @SuppressWarnings("unchecked") Long timestamp = this.msgIdMap.putIfAbsent((K) messageId, System.currentTimeMillis());
        return timestamp != null;
    }
}
